昨天我們建立了 chat-service 和 RAG 檢索系統,但用戶體驗還有個大問題:等待黑盒子。用戶發問後只能乾等 10-30 秒,不知道系統在做什麼。
這篇要用 GCP 原生服務 打造真正的即時推播,讓用戶看到每個處理步驟的進度。
目標:零維護的即時推播系統,支用戶併發,離線也能收到通知。
flowchart TB
subgraph "前端層"
WEB[Web App<br/>React/Vue]
MOBILE[Mobile App<br/>Flutter/React Native]
FIREBASE_SDK[Firebase SDK<br/>即時監聽]
end
subgraph "GCP 託管服務層"
FIRESTORE[Firestore<br/>即時資料庫]
FUNCTIONS[Cloud Functions<br/>事件處理]
PUBSUB[Pub/Sub<br/>事件匯流排]
EVENTARC[Eventarc<br/>事件路由]
FCM[Firebase Cloud Messaging<br/>推播通知]
end
subgraph "應用服務層"
CHAT[chat-service]
RAG[rag-service<br/>Agent Builder]
WORKER[worker-service<br/>Cloud Run Jobs]
end
subgraph "即時資料流"
WEB --> FIREBASE_SDK
MOBILE --> FIREBASE_SDK
FIREBASE_SDK <--> FIRESTORE
CHAT --> PUBSUB
RAG --> PUBSUB
WORKER --> PUBSUB
PUBSUB --> EVENTARC
EVENTARC --> FUNCTIONS
FUNCTIONS --> FIRESTORE
FUNCTIONS --> FCM
end
需求 | ❌ 傳統 WebSocket | ✅ Firebase 方案 | 優勢 |
---|---|---|---|
即時連接 | 自建 WebSocket 伺服器 | Firestore 即時監聽 | 零維護、自動擴展 |
斷線重連 | 自己寫重連邏輯 | Firebase SDK 自動處理 | 穩定性更高 |
負載均衡 | Sticky Session 配置 | Firebase 自動分散 | 無狀態,更靈活 |
離線支援 | 需自建快取機制 | 內建離線快取 | 用戶體驗更好 |
跨平台 | 每個平台不同實作 | 統一 Firebase SDK | 開發效率高 |
擴展性 | 需自己處理併發 | Google 基礎設施 | 百萬級併發 |
// Firestore 集合結構
collections:
chats: {
[chatId]: {
user_id: string,
status: 'pending' | 'processing' | 'completed' | 'error',
current_step: string,
progress: number, // 0-100
steps: {
'receive_message': { status: 'completed', timestamp: '...' },
'rag_search': { status: 'processing', progress: 30 },
'llm_generation': { status: 'pending' },
'response_ready': { status: 'pending' }
},
messages: [
{
role: 'user',
content: string,
timestamp: timestamp
},
{
role: 'assistant',
content: string,
timestamp: timestamp,
sources: [...], // RAG 來源
partial: boolean // 是否為部分內容
}
],
metadata: {
processing_time_ms: number,
rag_results_count: number,
error_message: string
},
created_at: timestamp,
updated_at: timestamp
}
},
user_sessions: {
[userId]: {
active_chats: [chatId, ...],
preferences: {...},
last_seen: timestamp
}
}
// firestore.rules
rules_version = '2';
service cloud.firestore {
match /databases/{database}/documents {
// 聊天文件權限
match /chats/{chatId} {
allow read, write: if request.auth != null &&
request.auth.uid == resource.data.user_id;
}
// 用戶會話權限
match /user_sessions/{userId} {
allow read, write: if request.auth != null &&
request.auth.uid == userId;
}
// 防止惡意寫入
match /{document=**} {
allow read, write: if false;
}
}
}
#!/bin/bash
# scripts/setup-firestore.sh
PROJECT_ID="your-project-id"
echo "🔥 設定 Firebase 和 Firestore..."
# 1. 啟用 Firebase API
gcloud services enable firebase.googleapis.com
gcloud services enable firestore.googleapis.com
# 2. 建立 Firebase 專案(如果還沒有)
firebase projects:addfirebase $PROJECT_ID
# 3. 初始化 Firestore
gcloud firestore databases create --region=asia-east1
# 4. 部署安全規則
firebase deploy --only firestore:rules
# 5. 建立複合索引
gcloud firestore indexes composite create \
--collection-group=chats \
--field-config=field-path=user_id,order=ASCENDING \
--field-config=field-path=updated_at,order=DESCENDING
echo "✅ Firestore 設定完成"
# shared/firebase_client.py
import firebase_admin
from firebase_admin import credentials, firestore, messaging
from typing import Dict, Any, Optional, List
import logging
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class FirebaseClient:
"""Firebase 客戶端統一管理"""
def __init__(self, project_id: str):
self.project_id = project_id
# 初始化 Firebase Admin
if not firebase_admin._apps:
cred = credentials.ApplicationDefault()
firebase_admin.initialize_app(cred, {
'projectId': project_id
})
# 初始化 Firestore 客戶端
self.db = firestore.client()
async def create_chat_session(self, chat_id: str, user_id: str, initial_message: str) -> bool:
"""建立新的聊天會話"""
try:
chat_ref = self.db.collection('chats').document(chat_id)
chat_data = {
'user_id': user_id,
'status': 'pending',
'current_step': 'receive_message',
'progress': 0,
'steps': {
'receive_message': {
'status': 'completed',
'timestamp': datetime.now()
},
'rag_search': {
'status': 'pending'
},
'llm_generation': {
'status': 'pending'
},
'response_ready': {
'status': 'pending'
}
},
'messages': [
{
'role': 'user',
'content': initial_message,
'timestamp': datetime.now()
}
],
'metadata': {},
'created_at': datetime.now(),
'updated_at': datetime.now()
}
chat_ref.set(chat_data)
# 更新用戶會話
await self._update_user_session(user_id, chat_id)
return True
except Exception as e:
logger.error(f"建立聊天會話失敗: {e}")
return False
async def update_chat_progress(
self,
chat_id: str,
step: str,
progress: int,
status: str = 'processing',
message: str = None,
metadata: Dict[str, Any] = None
):
"""更新聊天進度"""
try:
chat_ref = self.db.collection('chats').document(chat_id)
# 準備更新資料
update_data = {
'current_step': step,
'progress': progress,
'status': status,
f'steps.{step}.status': status,
f'steps.{step}.timestamp': datetime.now(),
'updated_at': datetime.now()
}
if progress:
update_data[f'steps.{step}.progress'] = progress
if message:
update_data[f'steps.{step}.message'] = message
if metadata:
for key, value in metadata.items():
update_data[f'metadata.{key}'] = value
# 原子更新
chat_ref.update(update_data)
logger.info(f"聊天 {chat_id} 進度更新: {step} - {progress}%")
except Exception as e:
logger.error(f"更新聊天進度失敗: {e}")
async def add_message(
self,
chat_id: str,
role: str,
content: str,
partial: bool = False,
sources: List[Dict] = None
):
"""添加訊息到聊天"""
try:
chat_ref = self.db.collection('chats').document(chat_id)
new_message = {
'role': role,
'content': content,
'timestamp': datetime.now(),
'partial': partial
}
if sources:
new_message['sources'] = sources
# 使用 arrayUnion 添加訊息
chat_ref.update({
'messages': firestore.ArrayUnion([new_message]),
'updated_at': datetime.now()
})
except Exception as e:
logger.error(f"添加訊息失敗: {e}")
async def complete_chat(
self,
chat_id: str,
final_response: str,
processing_time_ms: int,
sources: List[Dict] = None
):
"""完成聊天會話"""
try:
chat_ref = self.db.collection('chats').document(chat_id)
# 添加最終回應
if final_response:
await self.add_message(
chat_id,
'assistant',
final_response,
partial=False,
sources=sources
)
# 更新狀態為完成
chat_ref.update({
'status': 'completed',
'current_step': 'response_ready',
'progress': 100,
'steps.response_ready.status': 'completed',
'steps.response_ready.timestamp': datetime.now(),
'metadata.processing_time_ms': processing_time_ms,
'updated_at': datetime.now()
})
except Exception as e:
logger.error(f"完成聊天失敗: {e}")
async def error_chat(self, chat_id: str, error_message: str):
"""標記聊天為錯誤狀態"""
try:
chat_ref = self.db.collection('chats').document(chat_id)
chat_ref.update({
'status': 'error',
'metadata.error_message': error_message,
'updated_at': datetime.now()
})
except Exception as e:
logger.error(f"設定聊天錯誤狀態失敗: {e}")
async def _update_user_session(self, user_id: str, chat_id: str):
"""更新用戶會話資訊"""
try:
user_ref = self.db.collection('user_sessions').document(user_id)
user_ref.update({
'active_chats': firestore.ArrayUnion([chat_id]),
'last_seen': datetime.now()
})
except Exception as e:
# 如果文件不存在,建立新的
try:
user_ref.set({
'active_chats': [chat_id],
'last_seen': datetime.now(),
'preferences': {}
})
except Exception as e2:
logger.error(f"更新用戶會話失敗: {e2}")
# 全局 Firebase 客戶端實例
firebase_client: Optional[FirebaseClient] = None
def get_firebase_client(project_id: str = None) -> FirebaseClient:
"""獲取 Firebase 客戶端實例"""
global firebase_client
if firebase_client is None:
if not project_id:
import os
project_id = os.getenv('GCP_PROJECT_ID')
firebase_client = FirebaseClient(project_id)
return firebase_client
# services/chat/app/handlers.py (Firebase 整合版)
import asyncio
import time
from typing import Dict, Any
from shared.firebase_client import get_firebase_client
from .models import ChatRequest, ChatResponse
class ChatHandler:
def __init__(self):
# ... 原有初始化 ...
self.firebase = get_firebase_client()
async def process_chat(self, request: ChatRequest) -> ChatResponse:
"""主要對話處理邏輯(Firebase 整合版)"""
start_time = time.time()
chat_id = request.chat_id or str(uuid.uuid4())
try:
# 1. 建立 Firebase 聊天會話
await self.firebase.create_chat_session(
chat_id,
request.user_id,
request.message
)
# 2. 載入用戶上下文
await self.firebase.update_chat_progress(
chat_id,
'load_context',
10,
'processing',
'正在載入對話記憶...'
)
context = await self._load_user_context(request.user_id, chat_id)
# 3. 判斷處理模式
processing_mode = self._determine_processing_mode(request.message, context)
if processing_mode == ProcessingMode.SYNC:
return await self._handle_sync_firebase(request, chat_id, context, start_time)
else:
return await self._handle_async_firebase(request, chat_id, context, start_time)
except Exception as e:
await self.firebase.error_chat(chat_id, str(e))
raise
async def _handle_sync_firebase(
self,
request: ChatRequest,
chat_id: str,
context: Dict,
start_time: float
) -> ChatResponse:
"""同步處理(Firebase 版)"""
try:
# 更新進度:開始 LLM 生成
await self.firebase.update_chat_progress(
chat_id,
'llm_generation',
50,
'processing',
'正在生成回應...'
)
# 建立 prompt 並呼叫 LLM
prompt = self._build_simple_prompt(request.message, context)
response = await self._call_gemini(prompt, max_tokens=150)
# 儲存對話
await self._save_conversation(chat_id, request.user_id, request.message, response)
# 完成聊天
processing_time = int((time.time() - start_time) * 1000)
await self.firebase.complete_chat(
chat_id,
response,
processing_time
)
return ChatResponse(
message=response,
chat_id=chat_id,
processing_mode=ProcessingMode.SYNC,
is_complete=True,
requires_followup=False
)
except Exception as e:
await self.firebase.error_chat(chat_id, str(e))
raise
async def _handle_async_firebase(
self,
request: ChatRequest,
chat_id: str,
context: Dict,
start_time: float
) -> ChatResponse:
"""非同步處理(Firebase 版)"""
try:
# 立即回應用戶
quick_response = "我收到您的問題,正在仔細處理中..."
# 發送到 Pub/Sub 進行背景處理
await self._publish_task_event_firebase(request, chat_id, context)
# 更新進度:任務已排程
await self.firebase.update_chat_progress(
chat_id,
'task_scheduled',
20,
'processing',
'任務已排程,正在檢索相關資料...'
)
return ChatResponse(
message=quick_response,
chat_id=chat_id,
processing_mode=ProcessingMode.ASYNC,
is_complete=False,
requires_followup=True,
metadata={"estimated_time": "10-30秒"}
)
except Exception as e:
await self.firebase.error_chat(chat_id, str(e))
raise
async def _publish_task_event_firebase(
self,
request: ChatRequest,
chat_id: str,
context: Dict
):
"""發送任務事件到 Pub/Sub(包含 Firebase 資訊)"""
if not self.publisher:
logger.info("開發環境:模擬發送任務事件")
return
try:
event = {
"task_id": str(uuid.uuid4()),
"chat_id": chat_id,
"user_id": request.user_id,
"message": request.message,
"context": context,
"firebase_enabled": True, # 標記啟用 Firebase 更新
"created_at": datetime.now().isoformat()
}
message_data = json.dumps(event).encode('utf-8')
future = self.publisher.publish(self.topic_path, message_data)
logger.info(f"任務事件已發送到 Pub/Sub: {future.result()}")
except Exception as e:
logger.error(f"發送任務事件失敗: {e}")
await self.firebase.error_chat(chat_id, f"發送任務事件失敗: {e}")
# functions/chat_event_handler/main.py
import functions_framework
from google.cloud import pubsub_v1
import json
import asyncio
import logging
from shared.firebase_client import get_firebase_client
from services.rag.app.agent_builder_client import AgentBuilderClient
import google.generativeai as genai
logger = logging.getLogger(__name__)
# 全局初始化
firebase_client = get_firebase_client()
agent_builder = AgentBuilderClient(project_id="your-project-id")
@functions_framework.cloud_event
def handle_chat_task(cloud_event):
"""處理聊天任務事件"""
try:
# 解析 Pub/Sub 訊息
message_data = cloud_event.data["message"]["data"]
task_data = json.loads(base64.b64decode(message_data).decode())
# 執行非同步處理
asyncio.run(process_chat_task(task_data))
except Exception as e:
logger.error(f"處理聊天任務失敗: {e}")
async def process_chat_task(task_data: dict):
"""處理聊天任務的主要邏輯"""
chat_id = task_data["chat_id"]
user_id = task_data["user_id"]
message = task_data["message"]
try:
# 步驟 1: RAG 檢索
await firebase_client.update_chat_progress(
chat_id,
'rag_search',
30,
'processing',
'正在檢索相關文件...'
)
rag_results = await perform_rag_search(message, user_id)
# 步驟 2: LLM 生成
await firebase_client.update_chat_progress(
chat_id,
'llm_generation',
60,
'processing',
'正在生成詳細回答...'
)
final_response = await generate_rag_response(message, rag_results)
# 步驟 3: 流式回應(可選)
if len(final_response) > 500:
await stream_response(chat_id, final_response)
# 步驟 4: 完成處理
await firebase_client.complete_chat(
chat_id,
final_response,
processing_time_ms=0, # Cloud Function 中難以精確計算
sources=extract_sources(rag_results)
)
except Exception as e:
await firebase_client.error_chat(chat_id, str(e))
logger.error(f"處理任務失敗 {chat_id}: {e}")
async def perform_rag_search(query: str, user_id: str) -> dict:
"""執行 RAG 檢索"""
try:
# 使用 Agent Builder 進行檢索
results = await agent_builder.search_documents(
query=query,
page_size=5
)
return results
except Exception as e:
logger.error(f"RAG 檢索失敗: {e}")
return {"results": []}
async def generate_rag_response(query: str, rag_results: dict) -> str:
"""基於 RAG 結果生成回應"""
try:
# 建立增強的 prompt
context_parts = []
for result in rag_results.get("results", [])[:3]:
context_parts.append(f"參考資料:{result.get('content', '')[:300]}")
enhanced_prompt = f"""
基於以下參考資料回答問題:
{chr(10).join(context_parts)}
問題:{query}
請提供詳細、準確的回答,並在回答末尾註明參考來源。
""".strip()
# 呼叫 Gemini API
genai.configure(api_key="your-gemini-api-key")
model = genai.GenerativeModel('gemini-pro')
response = model.generate_content(enhanced_prompt)
return response.text
except Exception as e:
logger.error(f"生成回應失敗: {e}")
return "抱歉,我在處理您的問題時遇到了一些困難。請稍後再試。"
async def stream_response(chat_id: str, full_response: str):
"""流式推送長回應"""
try:
# 將長回應分成多個部分
chunks = [full_response[i:i+200] for i in range(0, len(full_response), 200)]
for i, chunk in enumerate(chunks):
progress = int((i + 1) / len(chunks) * 30) + 70 # 70-100%
# 添加部分回應
await firebase_client.add_message(
chat_id,
'assistant',
chunk,
partial=True
)
# 更新進度
await firebase_client.update_chat_progress(
chat_id,
'streaming_response',
progress,
'processing',
f'正在推送回應... ({i+1}/{len(chunks)})'
)
# 短暫延遲,模擬打字效果
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"流式回應失敗: {e}")
def extract_sources(rag_results: dict) -> list:
"""提取 RAG 來源資訊"""
sources = []
for result in rag_results.get("results", [])[:3]:
sources.append({
"title": result.get("title", "未知來源"),
"content_preview": result.get("content", "")[:100] + "...",
"metadata": result.get("metadata", {})
})
return sources
# functions/chat_event_handler/function.yaml
name: chat-event-handler
runtime: python310
source: .
entry_point: handle_chat_task
trigger:
event_trigger:
event_type: google.cloud.pubsub.topic.v1.messagePublished
resource: projects/YOUR_PROJECT/topics/chat-tasks
environment_variables:
GCP_PROJECT_ID: "your-project-id"
GEMINI_API_KEY: "your-gemini-api-key"
resources:
memory: 512MB
cpu: 1
timeout: 540s
service_account_email: "chat-functions-sa@your-project.iam.gserviceaccount.com"
#!/bin/bash
# scripts/deploy-functions.sh
echo "☁️ 部署 Cloud Functions..."
# 1. 建立服務帳號
gcloud iam service-accounts create chat-functions-sa \
--display-name="Chat Functions Service Account"
# 2. 設定權限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/firestore.user"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/pubsub.subscriber"
# 3. 部署函數
gcloud functions deploy chat-event-handler \
--runtime=python310 \
--trigger-topic=chat-tasks \
--source=functions/chat_event_handler \
--entry-point=handle_chat_task \
--service-account=chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com \
--set-env-vars="GCP_PROJECT_ID=$PROJECT_ID" \
--memory=512MB \
--timeout=540s \
--region=asia-east1
echo "✅ Cloud Functions 部署完成"
// frontend/src/firebase/config.js
import { initializeApp } from 'firebase/app';
import { getFirestore } from 'firebase/firestore';
import { getAuth } from 'firebase/auth';
import { getMessaging } from 'firebase/messaging';
const firebaseConfig = {
apiKey: "your-api-key",
authDomain: "your-project.firebaseapp.com",
projectId: "your-project-id",
storageBucket: "your-project.appspot.com",
messagingSenderId: "123456789",
appId: "your-app-id"
};
// 初始化 Firebase
const app = initializeApp(firebaseConfig);
// 初始化服務
export const db = getFirestore(app);
export const auth = getAuth(app);
export const messaging = getMessaging(app);
export default app;
// frontend/src/components/ChatInterface.jsx
import React, { useState, useEffect, useRef } from 'react';
import { doc, onSnapshot, collection, addDoc } from 'firebase/firestore';
import { db } from '../firebase/config';
import { useAuth } from '../hooks/useAuth';
const ChatInterface = () => {
const [messages, setMessages] = useState([]);
const [inputMessage, setInputMessage] = useState('');
const [chatStatus, setChatStatus] = useState(null);
const [currentProgress, setCurrentProgress] = useState(0);
const [isLoading, setIsLoading] = useState(false);
const { user } = useAuth();
const messagesEndRef = useRef(null);
const currentChatId = useRef(null);
// 滾動到最新訊息
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
};
useEffect(() => {
scrollToBottom();
}, [messages]);
// 監聽聊天狀態更新
useEffect(() => {
if (!currentChatId.current) return;
const unsubscribe = onSnapshot(
doc(db, 'chats', currentChatId.current),
(doc) => {
if (doc.exists()) {
const data = doc.data();
// 更新訊息
if (data.messages) {
setMessages(data.messages);
}
// 更新進度和狀態
setChatStatus(data.status);
setCurrentProgress(data.progress || 0);
// 如果完成,停止載入狀態
if (data.status === 'completed' || data.status === 'error') {
setIsLoading(false);
}
}
},
(error) => {
console.error('監聽聊天更新失敗:', error);
setIsLoading(false);
}
);
return () => unsubscribe();
}, [currentChatId.current]);
// 發送訊息
const sendMessage = async () => {
if (!inputMessage.trim() || isLoading || !user) return;
setIsLoading(true);
const messageText = inputMessage;
setInputMessage('');
try {
// 生成新的聊天 ID
const chatId = Date.now().toString();
currentChatId.current = chatId;
// 呼叫後端 API 開始處理
const response = await fetch('/api/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${await user.getIdToken()}`
},
body: JSON.stringify({
message: messageText,
user_id: user.uid,
chat_id: chatId,
processing_mode: 'async'
})
});
if (!response.ok) {
throw new Error('發送訊息失敗');
}
// 後端會處理 Firebase 更新,前端只需監聽
} catch (error) {
console.error('發送訊息失敗:', error);
setIsLoading(false);
alert('發送訊息失敗,請稍後重試');
}
};
// 處理按下 Enter 鍵
const handleKeyPress = (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
};
return (
<div className="chat-interface">
{/* 聊天區域 */}
<div className="chat-messages">
{messages.map((message, index) => (
<div key={index} className={`message ${message.role}`}>
<div className="message-content">
{message.content}
{message.partial && <span className="typing-indicator">...</span>}
</div>
{message.sources && (
<div className="message-sources">
<strong>參考來源:</strong>
{message.sources.map((source, idx) => (
<span key={idx} className="source-tag">
{source.title}
</span>
))}
</div>
)}
<div className="message-time">
{new Date(message.timestamp?.toDate()).toLocaleTimeString()}
</div>
</div>
))}
{/* 載入指示器和進度條 */}
{isLoading && (
<div className="loading-indicator">
<div className="progress-container">
<div className="progress-bar">
<div
className="progress-fill"
style={{ width: `${currentProgress}%` }}
></div>
</div>
<div className="progress-text">
{getProgressMessage(chatStatus, currentProgress)}
</div>
</div>
</div>
)}
<div ref={messagesEndRef} />
</div>
{/* 輸入區域 */}
<div className="chat-input">
<textarea
value={inputMessage}
onChange={(e) => setInputMessage(e.target.value)}
onKeyPress={handleKeyPress}
placeholder="輸入您的問題..."
disabled={isLoading}
rows="3"
/>
<button
onClick={sendMessage}
disabled={isLoading || !inputMessage.trim()}
className="send-button"
>
{isLoading ? '處理中...' : '發送'}
</button>
</div>
</div>
);
};
// 根據狀態獲取進度訊息
const getProgressMessage = (status, progress) => {
if (progress < 20) return '正在接收您的問題...';
if (progress < 40) return '正在檢索相關文件...';
if (progress < 70) return '正在分析和生成回答...';
if (progress < 90) return '正在完善回答內容...';
return '即將完成...';
};
export default ChatInterface;
/* frontend/src/components/ChatInterface.css */
.chat-interface {
display: flex;
flex-direction: column;
height: 100vh;
max-width: 800px;
margin: 0 auto;
border: 1px solid #e0e0e0;
border-radius: 8px;
overflow: hidden;
}
.chat-messages {
flex: 1;
overflow-y: auto;
padding: 20px;
background-color: #f8f9fa;
}
.message {
margin-bottom: 15px;
animation: fadeIn 0.3s ease-in;
}
.message.user {
text-align: right;
}
.message.assistant {
text-align: left;
}
.message-content {
display: inline-block;
max-width: 70%;
padding: 12px 16px;
border-radius: 18px;
word-wrap: break-word;
}
.message.user .message-content {
background-color: #007bff;
color: white;
}
.message.assistant .message-content {
background-color: white;
border: 1px solid #e0e0e0;
color: #333;
}
.typing-indicator {
animation: pulse 1.5s infinite;
color: #666;
}
.message-sources {
margin-top: 8px;
font-size: 12px;
color: #666;
}
.source-tag {
background-color: #e3f2fd;
padding: 2px 6px;
border-radius: 10px;
margin-right: 5px;
font-size: 10px;
}
.message-time {
font-size: 11px;
color: #999;
margin-top: 5px;
}
.loading-indicator {
text-align: center;
padding: 20px;
background-color: white;
border-radius: 10px;
margin: 10px 0;
border: 1px solid #e0e0e0;
}
.progress-container {
margin: 10px 0;
}
.progress-bar {
width: 100%;
height: 6px;
background-color: #e0e0e0;
border-radius: 3px;
overflow: hidden;
}
.progress-fill {
height: 100%;
background: linear-gradient(90deg, #007bff, #0056b3);
border-radius: 3px;
transition: width 0.3s ease;
animation: shimmer 2s infinite;
}
.progress-text {
margin-top: 10px;
color: #666;
font-size: 14px;
}
.chat-input {
display: flex;
padding: 20px;
border-top: 1px solid #e0e0e0;
background-color: white;
}
.chat-input textarea {
flex: 1;
border: 1px solid #ddd;
border-radius: 20px;
padding: 12px 16px;
font-size: 14px;
resize: none;
outline: none;
}
.chat-input textarea:focus {
border-color: #007bff;
}
.send-button {
margin-left: 10px;
background-color: #007bff;
color: white;
border: none;
border-radius: 20px;
padding: 12px 24px;
cursor: pointer;
font-size: 14px;
transition: background-color 0.2s;
}
.send-button:hover:not(:disabled) {
background-color: #0056b3;
}
.send-button:disabled {
background-color: #ccc;
cursor: not-allowed;
}
/* 動畫效果 */
@keyframes fadeIn {
from { opacity: 0; transform: translateY(10px); }
to { opacity: 1; transform: translateY(0); }
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
@keyframes shimmer {
0% { background-position: -200px 0; }
100% { background-position: 200px 0; }
}
.progress-fill {
background: linear-gradient(
90deg,
#007bff 25%,
#66b3ff 50%,
#007bff 75%
);
background-size: 200px 100%;
animation: shimmer 2s infinite;
}
// frontend/public/firebase-messaging-sw.js
import { initializeApp } from 'firebase/app';
import { getMessaging, onBackgroundMessage } from 'firebase/messaging/sw';
const firebaseConfig = {
// 你的 Firebase 配置
};
const app = initializeApp(firebaseConfig);
const messaging = getMessaging(app);
// 處理背景訊息
onBackgroundMessage(messaging, (payload) => {
console.log('收到背景推播:', payload);
const notificationTitle = payload.notification.title;
const notificationOptions = {
body: payload.notification.body,
icon: '/icon-192x192.png',
badge: '/badge-72x72.png',
data: payload.data,
actions: [
{
action: 'open_chat',
title: '查看對話'
}
]
};
self.registration.showNotification(notificationTitle, notificationOptions);
});
// 處理推播點擊
self.addEventListener('notificationclick', (event) => {
event.notification.close();
if (event.action === 'open_chat') {
const chatId = event.notification.data.chat_id;
const url = `${self.location.origin}/chat/${chatId}`;
event.waitUntil(
clients.openWindow(url)
);
}
});
// frontend/src/hooks/useNotifications.js
import { useState, useEffect } from 'react';
import { getMessaging, getToken, onMessage } from 'firebase/messaging';
import { messaging } from '../firebase/config';
export const useNotifications = (user) => {
const [notificationPermission, setNotificationPermission] = useState(
Notification.permission
);
const [fcmToken, setFcmToken] = useState(null);
useEffect(() => {
if (!user) return;
requestNotificationPermission();
setupMessageListener();
}, [user]);
const requestNotificationPermission = async () => {
try {
const permission = await Notification.requestPermission();
setNotificationPermission(permission);
if (permission === 'granted') {
const token = await getToken(messaging, {
vapidKey: 'your-vapid-key'
});
setFcmToken(token);
// 將 Token 發送到後端儲存
await saveFcmToken(user.uid, token);
}
} catch (error) {
console.error('獲取推播權限失敗:', error);
}
};
const setupMessageListener = () => {
onMessage(messaging, (payload) => {
console.log('收到前景推播:', payload);
// 顯示瀏覽器通知
if (notificationPermission === 'granted') {
new Notification(payload.notification.title, {
body: payload.notification.body,
icon: payload.notification.icon,
data: payload.data
});
}
});
};
const saveFcmToken = async (userId, token) => {
try {
await fetch('/api/user/fcm-token', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${await user.getIdToken()}`
},
body: JSON.stringify({ userId, fcmToken: token })
});
} catch (error) {
console.error('儲存 FCM Token 失敗:', error);
}
};
return {
notificationPermission,
fcmToken,
requestNotificationPermission
};
};
# shared/fcm_service.py
from firebase_admin import messaging
import logging
logger = logging.getLogger(__name__)
class FCMService:
"""Firebase Cloud Messaging 服務"""
def __init__(self):
pass
async def send_notification(
self,
fcm_token: str,
title: str,
body: str,
data: dict = None
) -> bool:
"""發送推播通知"""
try:
message = messaging.Message(
notification=messaging.Notification(
title=title,
body=body
),
data=data or {},
token=fcm_token
)
response = messaging.send(message)
logger.info(f"推播通知已發送: {response}")
return True
except Exception as e:
logger.error(f"發送推播通知失敗: {e}")
return False
async def send_chat_completion_notification(
self,
fcm_token: str,
chat_id: str,
preview: str
):
"""發送聊天完成通知"""
await self.send_notification(
fcm_token,
"AI 助手回覆完成",
preview[:50] + "..." if len(preview) > 50 else preview,
{
"type": "chat_completion",
"chat_id": chat_id
}
)
async def send_processing_notification(
self,
fcm_token: str,
chat_id: str,
progress: int
):
"""發送處理進度通知"""
if progress == 50: # 只在50%時發送一次,避免過多通知
await self.send_notification(
fcm_token,
"正在處理您的問題",
f"進度: {progress}%,預計還需要 10-15 秒",
{
"type": "processing_update",
"chat_id": chat_id,
"progress": str(progress)
}
)
# docker-compose.dev.yml
version: '3.8'
services:
# Chat Service
chat-service:
build:
context: .
dockerfile: services/chat/Dockerfile
ports:
- "8080:8080"
environment:
- ENVIRONMENT=development
- GCP_PROJECT_ID=your-project-id
- FIREBASE_ENABLED=true
- MEMORY_SERVICE_URL=http://memory-service:8081
- RAG_SERVICE_URL=http://rag-service:8082
- GEMINI_API_KEY=${GEMINI_API_KEY}
depends_on:
- memory-service
- rag-service
# Memory Service
memory-service:
build:
context: .
dockerfile: services/memory/Dockerfile
ports:
- "8081:8080"
environment:
- DATABASE_URL=postgresql://postgres:password@postgres:5432/ai_assistant
depends_on:
- postgres
# RAG Service (Agent Builder)
rag-service:
build:
context: .
dockerfile: services/rag/Dockerfile
ports:
- "8082:8080"
environment:
- GCP_PROJECT_ID=your-project-id
- DOCUMENTS_BUCKET=${PROJECT_ID}-documents
# PostgreSQL Database
postgres:
image: postgres:15
environment:
- POSTGRES_DB=ai_assistant
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
# Frontend Development Server
frontend:
build:
context: ./frontend
dockerfile: Dockerfile.dev
ports:
- "3000:3000"
environment:
- REACT_APP_API_BASE_URL=http://localhost:8080
- REACT_APP_FIREBASE_API_KEY=${FIREBASE_API_KEY}
- REACT_APP_FIREBASE_PROJECT_ID=your-project-id
volumes:
- ./frontend/src:/app/src
- ./frontend/public:/app/public
volumes:
postgres_data:
#!/bin/bash
# scripts/deploy-with-firebase.sh
PROJECT_ID="your-project-id"
REGION="asia-east1"
echo "🚀 部署完整的即時推播系統..."
# 1. 部署後端服務
echo "📦 部署後端服務..."
./scripts/deploy-backend-services.sh
# 2. 部署 Cloud Functions
echo "☁️ 部署 Cloud Functions..."
./scripts/deploy-functions.sh
# 3. 設定 Firestore 和 Firebase
echo "🔥 設定 Firebase..."
./scripts/setup-firestore.sh
# 4. 部署前端到 Firebase Hosting
echo "🌐 部署前端..."
cd frontend
npm run build
firebase deploy --only hosting
cd ..
# 5. 設定 FCM 和推播
echo "📱 設定推播通知..."
firebase deploy --only messaging
# 6. 測試完整流程
echo "🧪 測試即時推播..."
./scripts/test-realtime-flow.sh
echo "✅ 即時推播系統部署完成!"
echo ""
echo "🔗 應用網址:"
echo " 前端: https://your-project-id.web.app"
echo " API: https://chat-service-xxx.run.app"
echo ""
echo "📊 監控儀表板:"
echo " Firebase 控制台: https://console.firebase.google.com/project/your-project-id"
echo " Cloud Functions 日誌: https://console.cloud.google.com/functions"
#!/bin/bash
# scripts/test-realtime-flow.sh
echo "🧪 測試即時推播流程..."
API_BASE="https://chat-service-xxx.run.app"
TEST_USER_ID="test-user-123"
TEST_MESSAGE="請分析人工智慧的發展趨勢"
# 1. 測試建立聊天會話
echo "1. 測試建立聊天會話..."
CHAT_RESPONSE=$(curl -s -X POST "$API_BASE/chat" \
-H "Content-Type: application/json" \
-d '{
"message": "'$TEST_MESSAGE'",
"user_id": "'$TEST_USER_ID'",
"processing_mode": "async"
}')
CHAT_ID=$(echo $CHAT_RESPONSE | jq -r '.chat_id')
echo "聊天 ID: $CHAT_ID"
# 2. 監控 Firestore 更新(模擬)
echo "2. 模擬監控 Firestore 更新..."
for i in {1..30}; do
echo "檢查進度... ($i/30)"
# 實際場景中,這裡會檢查 Firestore
# 為了測試,我們模擬等待
sleep 2
if [ $i -eq 15 ]; then
echo "✅ 檢測到進度更新: RAG 檢索完成"
fi
if [ $i -eq 25 ]; then
echo "✅ 檢測到進度更新: LLM 生成完成"
fi
done
echo "✅ 即時推播流程測試完成"
# shared/firestore_monitor.py
from google.cloud import firestore
from google.cloud import monitoring_v3
import time
import logging
logger = logging.getLogger(__name__)
class FirestoreMonitor:
"""Firestore 使用監控"""
def __init__(self, project_id: str):
self.project_id = project_id
self.db = firestore.Client()
self.monitoring_client = monitoring_v3.MetricServiceClient()
self.project_name = f"projects/{project_id}"
async def record_chat_metrics(
self,
chat_id: str,
user_id: str,
processing_time_ms: int,
message_count: int
):
"""記錄聊天指標"""
try:
# 記錄到 Firestore 統計集合
stats_ref = self.db.collection('chat_stats').document(chat_id)
stats_ref.set({
'user_id': user_id,
'processing_time_ms': processing_time_ms,
'message_count': message_count,
'timestamp': firestore.SERVER_TIMESTAMP
})
# 記錄到 Cloud Monitoring
await self._record_processing_time(processing_time_ms)
await self._record_message_count(message_count)
except Exception as e:
logger.error(f"記錄聊天指標失敗: {e}")
async def _record_processing_time(self, processing_time_ms: int):
"""記錄處理時間指標"""
try:
series = monitoring_v3.TimeSeries()
series.metric.type = "custom.googleapis.com/chat/processing_time"
series.resource.type = "global"
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)
interval = monitoring_v3.TimeInterval({
"end_time": {"seconds": seconds, "nanos": nanos}
})
point = monitoring_v3.Point({
"interval": interval,
"value": {"int64_value": processing_time_ms}
})
series.points = [point]
self.monitoring_client.create_time_series(
name=self.project_name,
time_series=[series]
)
except Exception as e:
logger.error(f"記錄處理時間指標失敗: {e}")
async def get_daily_stats(self, date: str = None) -> dict:
"""獲取每日統計"""
try:
if not date:
from datetime import date as dt
date = dt.today().isoformat()
# 查詢當日統計
stats_query = self.db.collection('chat_stats') \
.where('timestamp', '>=', date) \
.where('timestamp', '<', date + 'T23:59:59')
docs = stats_query.stream()
total_chats = 0
total_processing_time = 0
total_messages = 0
for doc in docs:
data = doc.to_dict()
total_chats += 1
total_processing_time += data.get('processing_time_ms', 0)
total_messages += data.get('message_count', 0)
return {
'date': date,
'total_chats': total_chats,
'average_processing_time_ms': total_processing_time / total_chats if total_chats > 0 else 0,
'total_messages': total_messages,
'average_messages_per_chat': total_messages / total_chats if total_chats > 0 else 0
}
except Exception as e:
logger.error(f"獲取每日統計失敗: {e}")
return {}
# shared/performance_optimizer.py
import asyncio
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class FirebaseOptimizer:
"""Firebase 效能優化器"""
@staticmethod
def batch_firestore_writes(updates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""批次寫入優化"""
# 將多個更新合併為批次寫入
# 最多 500 個操作一批
batches = []
current_batch = []
for update in updates:
current_batch.append(update)
if len(current_batch) >= 500:
batches.append(current_batch)
current_batch = []
if current_batch:
batches.append(current_batch)
return batches
@staticmethod
def optimize_listener_queries(user_id: str) -> Dict[str, Any]:
"""優化 Firestore 監聽查詢"""
# 只監聽用戶相關的文件,減少不必要的讀取
return {
'collection': 'chats',
'where': [('user_id', '==', user_id)],
'orderBy': ('updated_at', 'desc'),
'limit': 10 # 只監聽最近 10 個聊天
}
@staticmethod
async def cleanup_old_chats(firebase_client, days: int = 30):
"""清理舊聊天記錄"""
try:
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days)
# 查詢舊聊天
old_chats_query = firebase_client.db.collection('chats') \
.where('updated_at', '<', cutoff_date) \
.limit(100) # 批次處理
docs = old_chats_query.stream()
deleted_count = 0
for doc in docs:
doc.reference.delete()
deleted_count += 1
logger.info(f"清理了 {deleted_count} 個舊聊天記錄")
except Exception as e:
logger.error(f"清理舊聊天記錄失敗: {e}")